package com.springbootblog.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * RabbitMQ direct配置类
 */
@Configuration
// 开启RabbitMQ注解模式
@EnableRabbit
public class RabbitMQDirectConfig
{
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String userName;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.listener.prefetch}")
    private int prefetch;
    @Value("${spring.rabbitmq.listener.concurrency}")
    private int concurrentConsumers;
    @Value("${spring.rabbitmq.listener.max-concurrency}")
    private int maxConcurrentConsumers;

    /**
     * 链接RabbitMQ
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory()
    {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true); //必须要设置
        return connectionFactory;
    }

    /**
     * 配置RabbitMQ参数
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        //设置最小并发的消费者数量
        factory.setConcurrentConsumers(concurrentConsumers);
        //设置最大并发的消费者数量
        factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
        //限流，单位时间内消费多少条记录
        factory.setPrefetchCount(prefetch);
        // json转消息
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置rabbit 确认消息的模式，默认是自动确认
        //factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置rabbit 确认消息的模式，默认是自动确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    /**
     * 回调函数
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Manatory，才能触发回调函数，无论消息推送结果怎么样都会强制调用回调函数
        rabbitTemplate.setMandatory(true);

        // 设置确认发送到交换机的回调函数 =》 消息推送到server，但是在server里找不到交换机 / 消息推送到sever，交换机和队列啥都没找到 / 消息推送到server，找到交换机了，但是没找到队列 / 消息推送成功
        rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
            System.out.println("相关数据："+correlationData);
            System.out.println("确认情况："+b);
            System.out.println("原因："+s);
            System.out.println("===============================");
        });

        //设置确认消息已发送到队列的回调  =》 消息推送到server，找到交换机了，但是没找到队列 触发这个回调函数
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            System.out.println("交换机为："+returnedMessage.getExchange());
            System.out.println("返回消息为："+returnedMessage.getMessage());
            System.out.println("路由键为："+returnedMessage.getRoutingKey());
            System.out.println("回应消息为："+returnedMessage.getReplyText());
            System.out.println("回应代码为："+returnedMessage.getReplyCode());
            System.out.println("===============================");
        });
        return rabbitTemplate;
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue queue()
    {
        return new Queue("test_queue_1", true);
    }

    /**
     * 交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange()
    {
        return new DirectExchange("test_exchange");
    }

    /**
     * 队列绑定交换机
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange)
    {
        return BindingBuilder.bind(queue).to(directExchange()).with("test_queue_1");
    }
}

